【streaming】hadoop-streaming实现
参考:https://hadoop.apache.org/docs/r1.0.4/cn/streaming.html
实质:在集群上执行的map-reduce流程,补充实现hive没有的功能
说明:任何可执行文件都可用作map、reduce文件
小例子:
- 当只有mapper功能时:针对超大表和小表的join类型的操作,可将小表存成内存文件格式,逐行匹配大表
HADOOP_BIN=`which hadoop`
echo util.sh $HADOOP_BIN
HADOOP=$HADOOP_BIN
mydir=`pwd`
# 退出程序
function myexit() {
if [ $? != 0 ]
then
echo "exit, fatal"
exit 1
fi
}
# 中间结果删除
test_file_rmr() {
${HADOOP_BIN} fs -test -e $1
if [ $? -ne "0" ];then
echo "[i see, hadoop output $1 do not exist]"
else
${HADOOP_BIN} fs -rmr $1
fi
myexit
}
# 执行hadoop-streaming流程
base_call_mapper() {
local out=$2
test_file_rmr $out
${HADOOP_BIN} jar /usr/local/hadoop-2.7.3/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar \
-input $1 \
-output $out \
-mapper "$3" \
-reducer None \
-file $4 \
-jobconf mapred.output.compress=true \
-jobconf mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec \
-jobconf mapred.job.name=$5 \
-jobconf mapred.map.tasks=$6 \
-jobconf mapred.job.map.capacity=$6 \
-jobconf mapred.reduce.tasks=0
myexit
${HADOOP_BIN} fs -dus $out
}
local myfile="$path/data/ad_orentation_configration.txt -file $path/stage1_combine_match.py -file $util -file $path/reduce_index.py"
base_call_mapper "$in_j" "$out_j" "python stage1_combine_match.py" "$myfile" "ad_orentation_cover_user_"$j 800
# input为输入数据在hdfs上的位置
# output为输出数据在hdfs上位置
# $3 为执行mapper的操作,在此mapper为 python stage1_combine_match.py
# $4 为执行时需要把本地文件加载到hdfs上,待加载的文件,在此包含ad_orentation_configration、stage1_combine_match.py reduce_index.py
# $5 $6 均为参数设置,设置mapper、reducer的相关配置,同hive中的配置相同
注:
- 可以不包含reducer操作,只需 -reducer None,-jobconf mapred.reduce.tasks=0 即可 2.
- mapper和reducer均存在
base_call_multi() {
local out=$2
test_file_rmr $out
${HADOOP_BIN} jar /usr/local/hadoop-2.7.3/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar \
-libjars ${mydir}/multiout.jar \
-input $1 \
-output $out \
-mapper "$3" \
-reducer "$4" \
-file $5 \
-outputformat adsfanstop.multiout \
-jobconf mapred.output.compress=true \
-jobconf mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec \
-jobconf mapred.job.name=$6 \
-jobconf mapred.map.tasks=$7 \
-jobconf mapred.job.map.capacity=$7 \
-jobconf mapred.reduce.tasks=$8 \
-jobconf mapred.job.reduce.capacity=$8
myexit
${HADOOP_BIN} fs -dus $out
}